股票買賣
判斷應該是要買多還是賣空、預判漲跌,然後使利潤最大化
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import math
np.seterr(divide='ignore', invalid='ignore')
stock_data = pd.read_csv('stock(eng).csv', encoding='utf-8', thousands=',')
print("dataset_shape = ", stock_data.shape)
# 元大台灣50, 元大高股息, 鴻海, 台積電, 聯發科, 大立光, 富邦金, 國泰金, 玉山金, 元大金
reputation = [37, 15, 43, 49, 44, 20, 25, 26, 22, 24]
reputation = np.argsort(reputation) + 1
# 1402: 6 years
reputation = np.repeat(reputation, 1402)
stock_data['Reputation'] = reputation
stock_data.head()
print("is_any_null ", stock_data.isnull().values.any())
stock_data = stock_data.dropna(thresh=len(stock_data.index) * 0.8, axis=1)
stock_data = stock_data.fillna(stock_data.mean())
stock_data.min()
stock_data.mean()
from sklearn.preprocessing import StandardScaler
min_too_big = stock_data._get_numeric_data().min() > 1000
min_too_big_attribute = set(min_too_big[min_too_big == True].index)
stock_data[list(min_too_big_attribute)] /= 1000
mean_too_large = stock_data.mean() > 1000000
mean_too_large_attribute = set(mean_too_large[mean_too_large == True].index)
stock_data[list(mean_too_large_attribute)] /= 1000000
mean_too_big = stock_data.mean() > 1000
mean_too_big_attribute = set(mean_too_big[mean_too_big == True].index)
stock_data[list(mean_too_big_attribute)] /= 1000
stock_data.mean()
stock_data.head()
stock_data.tail()
stock_data.info()
stock_data.describe()
print(stock_data.nunique())
stock_data_cols = stock_data.columns
stock_data_cols_set = set(stock_data_cols)
stock_data_cols_list = list(stock_data_cols_set)
stock_data_continuous_variables_cols = stock_data._get_numeric_data().columns
stock_data_continuous_variables_cols_set = set(stock_data_continuous_variables_cols)
stock_data_continuous_variables_cols_list = list(stock_data_continuous_variables_cols_set)
print("stock_data_continuous_variables: ", stock_data_continuous_variables_cols_list)
stock_data_categorical_variables_cols_set = stock_data_cols_set - stock_data_continuous_variables_cols_set
stock_data_categorical_variables_cols_list = list(stock_data_categorical_variables_cols_set)
print("stock_data_categorical_variables:", stock_data_categorical_variables_cols_list)
[print(f'categorical variable: {i}\n{stock_data[i].value_counts()}\n') for i in stock_data_categorical_variables_cols_list]
[print(f'categorical variable: {i}\n{stock_data[i].value_counts(normalize=True, sort=True)}\n') for i in stock_data_categorical_variables_cols_list]
# [stock_data[i].value_counts().plot.bar(title=f'Freq dist of {i}') for i in stock_data_categorical_variables_cols_list]
for stock_data_categorical_variables_col in stock_data_categorical_variables_cols_list:
stock_data[stock_data_categorical_variables_col].value_counts().plot.bar()
plt.title(f'Freq dist of {stock_data_categorical_variables_col}')
plt.show()
for stock_data_continuous_variables_col in stock_data_continuous_variables_cols_list:
stock_data[stock_data_continuous_variables_col].hist(figsize= (8, 8), bins= 100)
plt.title(stock_data_continuous_variables_col)
plt.show()
stock_data_continuous_variables_cols_list_without_nan = []
for i in stock_data_continuous_variables_cols_list:
if stock_data[i].isnull().values.any() == False:
stock_data_continuous_variables_cols_list_without_nan.append(i)
fig, ax = plt.subplots(len(stock_data_continuous_variables_cols_list_without_nan), figsize=(16, 30))
for stock_data_idx, stock_data_continuous_variables_col in enumerate(stock_data_continuous_variables_cols_list_without_nan):
sns.distplot(stock_data[stock_data_continuous_variables_col], hist=True, ax=ax[stock_data_idx])
ax[stock_data_idx].set_title('Freq dist '+ stock_data_continuous_variables_col, fontsize=20)
ax[stock_data_idx].set_xlabel(stock_data_continuous_variables_col, fontsize=10)
ax[stock_data_idx].set_ylabel('Count', fontsize=10)
plt.show()
sns.pairplot(stock_data[stock_data_continuous_variables_cols_list])
plt.show()
plt.figure(figsize=(10, 10))
sns.heatmap(stock_data.corr(), annot=False, center=0.0, cmap='coolwarm'); # cmap="YlGnBu",
plt.show()
stock_type = set(stock_data[stock_data.columns[0]])
observe_date_long = 30
col_name = []
stock_data_list = []
Y = []
for s in stock_type:
col_name.append(s)
df = stock_data[stock_data[stock_data.columns[0]] == s]
df = df.dropna(axis='columns')
df = df.drop(columns=stock_data.columns[0])
df = df.drop(columns=stock_data.columns[1])
df = df.drop(columns='Market')
rep = np.unique(df['Reputation'])[0]
df = df.drop(columns='Reputation')
df = df.reset_index(drop=True)
Y.append(df['ROI%'].iloc[observe_date_long:])
df = df.drop(columns='ROI%')
df = df.reset_index(drop=True)
df_1 = df.copy()
for i in range(observe_date_long - 1):
df_new = df_1.drop(df_1.index[0], inplace=False)
df_new = df_new.append(df_1.iloc[-1])
df_1 = df_1.reset_index(drop=True)
df_new = df_new.reset_index(drop=True)
df_1 = df_new.copy()
df = pd.concat([df, df_1], axis=1)
df['Reputation'] = rep
df = df.drop(df.index[:observe_date_long], inplace=False)
stock_data_list.append(df)
stock_data_roll_window = stock_data_list[0]
for i in range(len(stock_data_list) - 1):
stock_data_roll_window = pd.concat([stock_data_roll_window, stock_data_list[i + 1]], axis=0)
X = stock_data_list.copy()
train_x = []
test_x = []
train_y = []
test_y = []
n_test = 30
for i in range(len(stock_type)):
train_x.append(X[i].iloc[:-n_test])
test_x.append(X[i].iloc[-n_test:])
train_y.append(Y[i][:-n_test])
test_y.append(Y[i][-n_test:])
train_x_roll_window = train_x[0]
test_x_roll_window = test_x[0]
train_y_roll_window = train_y[0]
test_y_roll_window = test_y[0]
for i in range(len(train_x) - 1):
train_x_roll_window = pd.concat([train_x_roll_window, train_x[i + 1]], axis=0)
test_x_roll_window = pd.concat([test_x_roll_window, test_x[i + 1]], axis=0)
train_y_roll_window = pd.concat([train_y_roll_window, train_y[i + 1]], axis=0)
test_y_roll_window = pd.concat([test_y_roll_window, test_y[i + 1]], axis=0)
train_x = train_x_roll_window
test_x = test_x_roll_window
train_y = np.array(train_y_roll_window)
test_y = np.array(test_y_roll_window)
from sklearn.decomposition import PCA
n_com = 10
explained_ratio = 0
explained_ratio_threshold = 90
for i in range(len(stock_type)):
covar_matrix = PCA(n_components = n_com)
# calculate variance ratios
covar_matrix.fit(X[i])
cumulative_sum_of_variance_explained = np.cumsum(np.round(covar_matrix.explained_variance_ratio_, decimals=4) * 100)
explained_ratio = cumulative_sum_of_variance_explained[-1]
while explained_ratio < explained_ratio_threshold:
n_com += 1
covar_matrix = PCA(n_components = n_com)
# calculate variance ratios
covar_matrix.fit(X[i])
cumulative_sum_of_variance_explained = np.cumsum(np.round(covar_matrix.explained_variance_ratio_, decimals=4) * 100)
explained_ratio = cumulative_sum_of_variance_explained[-1]
X_pca = []
for i in range(len(stock_type)):
pca = PCA(n_components = n_com)
principalComponents = pca.fit_transform(X[i])
print(col_name[i], 'explained variance ratio:', np.sum(pca.explained_variance_ratio_))
X_pca.append(pd.DataFrame(data = principalComponents))
train_x_pca = []
test_x_pca = []
n_test = 30
for i in range(len(stock_type)):
train_x_pca.append(X_pca[i].iloc[:-n_test])
test_x_pca.append(X_pca[i].iloc[-n_test:])
train_x_pca_roll_window = train_x_pca[0]
test_x_pca_roll_window = test_x_pca[0]
for i in range(len(train_x_pca) - 1):
train_x_pca_roll_window = pd.concat([train_x_pca_roll_window, train_x_pca[i + 1]], axis=0)
test_x_pca_roll_window = pd.concat([test_x_pca_roll_window, test_x_pca[i + 1]], axis=0)
train_x_pca = train_x_pca_roll_window
test_x_pca = test_x_pca_roll_window
from sklearn.utils import shuffle
train_x, train_x_pca, train_y = shuffle(train_x, train_x_pca, train_y, random_state=0)
from sklearn import linear_model
import statsmodels.api as sm
#Fit the linear regression.
regr = linear_model.LinearRegression()
regr = regr.fit(train_x, train_y)
#Print the coefficient.
print('intercept:', regr.intercept_)
print('coef:', regr.coef_)
X_OLS = sm.add_constant(train_x)
train_y = list(train_y)
model = (sm.OLS(train_y, X_OLS).fit())
from sklearn.svm import LinearSVR
svm = LinearSVR()
svm = svm.fit(train_x_pca, train_y)
import keras
model = keras.models.Sequential()
model.add(keras.layers.Dense(10, activation="selu"))
model.add(keras.layers.Dense(10, activation='relu'))
model.add(keras.layers.Dense(1))
model.compile(loss="mse", optimizer="adam", metrics=['mae'])
model.fit(np.array(train_x_pca), np.array(train_y), batch_size=256, epochs=20)
def long(test_day, n_test, stock_type1, stock_type2, stock_type3, money):
buy = 0.5 * money
transection_fee = 0.1 / 100
ROI_1 = test_y[test_day + stock_type1 * n_test] / 100
ROI_2 = test_y[test_day + stock_type2 * n_test] / 100
ROI_3 = test_y[test_day + stock_type3 * n_test] / 100
if ROI_3 > transection_fee:
money += 0.5 * buy * ROI_1
money += 0.5 * 0.5 * buy * ROI_2
money += 0.5 * 0.5 * buy * ROI_3
money -= transection_fee * buy
# print('ROI:', ROI_1, ROI_2, ROI_3)
elif ROI_2 > transection_fee:
money += 0.5 * buy * ROI_1
money += 0.5 * buy * ROI_2
money -= transection_fee * buy
# print('ROI:', ROI_1, ROI_2)
elif ROI_1 > transection_fee:
money += buy * ROI_1
money -= transection_fee * buy
# print('ROI:', ROI_1)
print('money since long:', money)
return money
def short(test_day, n_test, stock_type1, stock_type2, stock_type3, money):
sell = 0.5 * money
transection_fee = 0.1 / 100
ROI_1 = test_y[test_day + stock_type1 * n_test] / 100
ROI_2 = test_y[test_day + stock_type2 * n_test] / 100
ROI_3 = test_y[test_day + stock_type3 * n_test] / 100
if ROI_3 < -transection_fee:
money -= 0.5 * sell * ROI_1
money -= 0.5 * 0.5 * sell * ROI_2
money -= 0.5 * 0.5 * sell * ROI_3
money -= transection_fee * sell
# print('ROI:', ROI_1, ROI_2, ROI_3)
elif ROI_2 < -transection_fee:
money -= 0.5 * sell * ROI_1
money -= 0.5 * sell * ROI_2
money -= transection_fee * sell
# print('ROI:', ROI_1, ROI_2)
elif ROI_1 < -transection_fee:
money -= sell * ROI_1
money -= transection_fee * sell
# print('ROI:', ROI_1)
print('money since short:', money)
return money
money_std = 10000
def money_gain(n_test, pred_y, money_begin):
money = money_begin
print('day 0')
print('money initial:', money)
for test_day in range(n_test):
pred_i = pred_y[test_day : : n_test]
sort_idx = np.argsort(pred_i)
print('day', test_day + 1)
money = long(test_day, n_test, sort_idx[-1], sort_idx[-2], sort_idx[-3], money)
money = short(test_day, n_test, sort_idx[0], sort_idx[1], sort_idx[2], money)
return money
pred_y_regr = regr.predict(test_x)
print('predict rate of return:')
print(pred_y_regr)
money_regr = money_std
pred_y_regr = np.array(pred_y_regr)
money_regr = money_gain(n_test, pred_y_regr, money_regr)
print('final money with linear regression:', money_regr)
pred_y_svm = svm.predict(test_x_pca)
print('predict rate of return:')
print(pred_y_svm)
money_svm = money_std
pred_y_svm = np.array(pred_y_svm)
money_svm = money_gain(n_test, pred_y_svm, money_svm)
print('final money with svm:', money_svm)
pred_y_dnn = model.predict(test_x_pca).flatten()
print('predict rate of return:')
print(pred_y_dnn)
money_dnn = money_std
pred_y_dnn = np.array(pred_y_dnn)
money_dnn = money_gain(n_test, pred_y_dnn, money_dnn)
print('final money with dnn model:', money_dnn)
def get_label(n_test, y):
label = []
for test_day in range(n_test):
sort_idx = y[test_day : : n_test]
label.append(np.argsort(sort_idx))
label = np.array(label).flatten()
return label
def get_mse(test_y, pred_y):
mse = ((test_y - pred_y) ** 2).mean()
return mse
print('final money with linear regression:', money_regr)
print('final money with svm:', money_svm)
print('final money with dnn model:', money_dnn)
test_label = get_label(n_test, test_y)
pred_label_regr = get_label(n_test, pred_y_regr)
pred_label_svm = get_label(n_test, pred_y_svm)
pred_label_dnn = get_label(n_test, pred_y_dnn)
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score, recall_score, precision_score
print('Confusion matrix')
print('linear regression:', confusion_matrix(test_label, pred_label_regr), sep='\n')
print('SVM:', confusion_matrix(test_label, pred_label_svm), sep='\n')
print('DNN:', confusion_matrix(test_label, pred_label_dnn), sep='\n')
print('Accuracy')
print('linear regression:', accuracy_score(test_label, pred_label_regr))
print('SVM:', accuracy_score(test_label, pred_label_svm))
print('DNN:', accuracy_score(test_label, pred_label_dnn))
print('Sensitivity(Recall)')
print('linear regression:', recall_score(test_label, pred_label_regr, average=None))
print('SVM:', recall_score(test_label, pred_label_svm, average=None))
print('DNN:', recall_score(test_label, pred_label_dnn, average=None))
print('Precision')
print('linear regression:', precision_score(test_label, pred_label_regr, average=None))
print('SVM:', precision_score(test_label, pred_label_svm, average=None))
print('DNN:', precision_score(test_label, pred_label_dnn, average=None))
print('Mean Square Error')
print('linear regression:', get_mse(test_y, pred_y_regr))
print('SVM:', get_mse(test_y, pred_y_svm))
print('DNN:', get_mse(test_y, pred_y_dnn))
print('\t\t\tlinear regression', 'SVM', '\tDNN', sep='\t\t\t')
print('Accuracy:\t\t', accuracy_score(test_label, pred_label_regr),'\t\t\t', accuracy_score(test_label, pred_label_svm), '\t\t\t\t', accuracy_score(test_label, pred_label_dnn))
print('Sensitivity(Recall):\t', recall_score(test_label, pred_label_regr, average='weighted'),'\t\t\t', recall_score(test_label, pred_label_svm, average='weighted'), '\t\t\t\t', recall_score(test_label, pred_label_dnn, average='weighted'))
print('Precision:\t\t', precision_score(test_label, pred_label_regr, average='weighted'),'\t\t\t', precision_score(test_label, pred_label_svm, average='weighted'), '\t\t\t\t', precision_score(test_label, pred_label_dnn, average='weighted'))
print('Mean Square Error\t', get_mse(test_y, pred_y_regr),'\t\t\t', get_mse(test_y, pred_y_svm), '\t\t', get_mse(test_y, pred_y_dnn))
